package cn.com.acca.ma.service.impl;

import cn.com.acca.ma.common.util.DateUtil;
import cn.com.acca.ma.common.util.PropertiesUtil;
import cn.com.acca.ma.enumeration.PythonApiResult;
import cn.com.acca.ma.enumeration.UpDown;
import cn.com.acca.ma.jpa.util.JpaUtil;
import cn.com.acca.ma.model.ConceptBoard;
import cn.com.acca.ma.model.EtfInfo;
import cn.com.acca.ma.model.EtfTransactionData;
import cn.com.acca.ma.model.StockTransactionData;
import cn.com.acca.ma.thread.AbstractThread;
import cn.com.acca.ma.thread.write.CollectEtfTransactionDataThread_sohu;
import cn.com.acca.ma.service.EtfTransactionDataService;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import javax.persistence.EntityManager;
import javax.persistence.Query;
import java.io.*;
import java.math.BigDecimal;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;

public class EtfTransactionDataServiceImpl extends BaseServiceImpl<EtfTransactionDataServiceImpl, EtfTransactionData> implements EtfTransactionDataService {

    public EtfTransactionDataServiceImpl() {
    }

    /*********************************************************************************************************************
     *
     * 							从网络上获取数据，并插入到数据库中
     *
     *********************************************************************************************************************/
    /**
     * 从网络上获取数据，并插入到数据库。分别调用方法：readEtfTransactionDataList, writeEtfTransactionData
     * 和readEtfCode。
     * 注意：当etf_transaction_data表为空时，调用这个方法则last_close_price字段为空，还得再调用方法writeEtfTransactionDataLastClosePrice，
     * 增量更新时不用再调用这个方法。
     */
    public void insertEtfTransactionData_sohu() {
        logger.info("从网络上获取数据，并插入到数据库");

        List<EtfInfo> etfInfoList = etfInfoDao.getAllEtfInfo();

        if (null != etfInfoList) {
            // 从etf-transaction-data.properties文件中获取参数
            InputStream inputStream = null;
            Properties properties = new Properties();
            // 开始时间
            String beginDate = null;
            // 结束时间
            String endDate = null;
            try {
                inputStream = new BufferedInputStream(new FileInputStream(ETF_TRANSACTION_DATA_PROPERTIES));
                properties.load(inputStream);

                beginDate = properties.getProperty("etfTransactionData.begin.date");
                endDate = properties.getProperty("etfTransactionData.end.date");

                logger.info("采集ETF数据的开始时间为【" + beginDate + "】，结束时间为【" + endDate + "】");
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            EtfTransactionDataService etfTransactionDataService = new EtfTransactionDataServiceImpl();
            // 线程之间的间隔时间
            int threadIntervalSleepMillis = Integer.valueOf(PropertiesUtil.getValue(THREAD_PROPERTIES, "thread.interval.sleep.millis"));
            String etfInfoCodePrefix = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES, "etfTransactionData.sohu.code.prefix");

            // 查找所有ETF在某一日的前收盘
            List lastClosePriceList = etfTransactionDataDao.findLastClosePrice(DateUtil.stringToDate(beginDate));

            for (int i = 0; i < etfInfoList.size(); i++) {
                EtfInfo etfInfo = etfInfoList.get(i);
                String code = etfInfo.getCode().substring(2);

                // 找出这支ETF的前收盘价
                BigDecimal lastClosePrice = null;
                if (null != lastClosePriceList && lastClosePriceList.size() > 0) {
                    for (int j = 0; j < lastClosePriceList.size(); j++) {
                        Object[] objectArray = (Object[]) lastClosePriceList.get(j);
                        if (((String) objectArray[0]).equals(code)) {
                            lastClosePrice = (BigDecimal) objectArray[1];
                        }
                    }
                }

                // 启动消费者线程
                Thread producerThread = new Thread(new CollectEtfTransactionDataThread_sohu(etfTransactionDataService,
                        etfInfoCodePrefix, code, beginDate, endDate, lastClosePrice));
                producerThread.start();
                try {
                    Thread.sleep(threadIntervalSleepMillis);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            // 每个一段时间（threadIntervalSleepEtfTransactionDataCollectFinish）就检查一遍股票交易数据是否采集、存储完成
            int threadIntervalSleepEtfTransactionDataCollectFinish = Integer.valueOf(PropertiesUtil.getValue(THREAD_PROPERTIES, "thread.interval.sleep.stockTransactionData.collect.finish"));
            while (true) {
                if (AbstractThread.etfTransactionDataCollectFinishAmount == etfInfoList.size()) {
                    break;
                } else {
                    try {
                        Thread.sleep(threadIntervalSleepEtfTransactionDataCollectFinish);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } else {
            logger.warn("从etf_info表中没有找到任何记录");
        }
    }

    /**
     * 调用myquant的接口，向etf_transaction_data表中插入数据
     */
    @Override
    public void writeEtfTransactionDataByDate_myQuant() {
        String filePath = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES,"adam3.api.file.path");
        String writeEtfTransactionDataByDate = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES,"adam3.api.method.writeEtfTransactionDataByDate");
        String beginDate = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES,"etfTransactionData.begin.date");
        String endDate = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES,"etfTransactionData.end.date");

        logger.info("调用myquant的接口，向etf_transaction_data表中插入数据，开始时间[" + beginDate + "]，结束时间[" + endDate + "]");

        Process process = null;
        BufferedReader in = null;
        try {
            // 调用adam3的接口，向etf_transaction_data表中插入记录
            String[] parameterArray = new String[]{"python", filePath, "api", writeEtfTransactionDataByDate, beginDate, endDate};
            process = Runtime.getRuntime().exec(parameterArray);
            in = new BufferedReader(new InputStreamReader(process.getInputStream(), "gbk"));
            StringBuffer stringBuffer = new StringBuffer();
            String actionStr;
            while ((actionStr = in.readLine()) != null) {
                stringBuffer.append(actionStr);
                logger.debug("接口返回值：" + actionStr);
            }
            if (!PythonApiResult.FINISH.getCode().equals(stringBuffer.toString())){
                logger.warn("调用接口writeEtfTransactionDataByDate失败，返回值：" + stringBuffer);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 从网络中获取数据(sohu)
     *
     * @param url
     * @param codeNum
     * @param lastClosePrice
     * @return
     */
    public List<EtfTransactionData> readEtfTransactionDataList_sohu(String url, String codeNum, BigDecimal lastClosePrice) {
        URL realUrl;
        URLConnection urlConnection;
        BufferedReader bufferedReader;
        File file;
        FileWriter fileWriter = null;
        CloseableHttpClient httpClient = null;
        CloseableHttpResponse response = null;
        List<EtfTransactionData> etfTransactionDataList = new ArrayList<>();

        try {
            httpClient = HttpClients.createDefault();
            HttpGet httpGet = new HttpGet(url);
            httpGet.setHeader("Accept", "application/json");
            httpGet.setHeader("Content-Type", "application/json");

            RequestConfig requestConfig = RequestConfig.custom()
                    .setConnectionRequestTimeout(6000000)
                    .setSocketTimeout(6000000)
                    .setConnectTimeout(6000000)
                    .build();
            httpGet.setConfig(requestConfig);
            response = httpClient.execute(httpGet);
            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                return null;
            }
            HttpEntity entity = response.getEntity();
            String res = EntityUtils.toString(entity);
            String result = new String(res.getBytes(), "GBK");
            JSONArray baseJSONArray = null;
            try {
                baseJSONArray = JSON.parseArray(result);
            } catch (JSONException e) {
//                e.printStackTrace();
                logger.warn("调用接口【" + url + "】，返回的数据为【" + result + "】，说明ETF停牌，没有数据");
                return null;
            }

            JSONObject baseJsonObject = (JSONObject) baseJSONArray.get(0);
            JSONArray jsonArray = baseJsonObject.getJSONArray("hq");
            if (null != jsonArray && jsonArray.size() != 0) {
                for (int i = 0; i < jsonArray.size(); i++) {
                    EtfTransactionData etfTransactionData = new EtfTransactionData();
                    JSONArray dataJsonArray = (JSONArray) jsonArray.get(i);

                    Date date = DateUtil.stringToDate(dataJsonArray.getString(0));
                    etfTransactionData.setDate(date);
                    String code = baseJsonObject.getString("code").substring(3);
                    etfTransactionData.setCode(code);
                    etfTransactionData.setOpenPrice(new BigDecimal(dataJsonArray.getString(1)));
                    etfTransactionData.setClosePrice(new BigDecimal(dataJsonArray.getString(2)));
                    etfTransactionData.setChangeAmount(new BigDecimal(dataJsonArray.getString(3)));
                    String changeRange = dataJsonArray.getString(4);
                    etfTransactionData.setChangeRange(new BigDecimal(changeRange.substring(0, changeRange.length() - 1)));
                    etfTransactionData.setLowestPrice(new BigDecimal(dataJsonArray.getString(5)));
                    etfTransactionData.setHighestPrice(new BigDecimal(dataJsonArray.getString(6)));
                    etfTransactionData.setVolume(new BigDecimal(dataJsonArray.getString(7)));
                    etfTransactionData.setTurnover(new BigDecimal(dataJsonArray.getString(8)));
                    String turnoverRate = dataJsonArray.getString(9);
//                    etfTransactionData.setTurnoverRate(new BigDecimal(turnoverRate.substring(0, turnoverRate.length() - 1)));
                    etfTransactionData.setLastClosePrice(lastClosePrice);
                    // 判断涨跌
                    // 平
//                    if (null != etfTransactionData.getChangeRange() && etfTransactionData.getChangeRange().compareTo(BigDecimal.ZERO) == 0) {
//                        etfTransactionData.setUpDown(BigDecimal.valueOf(UpDown.MIDDLE.getIndex()));
//                    }
                    // 涨
//                    if (null != etfTransactionData.getChangeRange() && etfTransactionData.getChangeRange().compareTo(BigDecimal.ZERO) == 1) {
//                        etfTransactionData.setUpDown(BigDecimal.valueOf(UpDown.UP.getIndex()));
//                    }
                    // 跌
//                    if (null != etfTransactionData.getChangeRange() && etfTransactionData.getChangeRange().compareTo(BigDecimal.ZERO) == -1) {
//                        etfTransactionData.setUpDown(BigDecimal.valueOf(UpDown.DOWN.getIndex()));
//                    }
                    etfTransactionDataList.add(etfTransactionData);
                }
            } else {
                logger.warn("url：【" + url + "】返回值中的hq为空，可能根本就没有这只ETF，或者这是新ETF还没有上市");
            }
        } catch (IOException e) {
            e.printStackTrace();
            logger.warn("url：【" + url + "】连接异常，将连接信息记录在文件【"
                    + CONNECTION_EXCEPTION_FILE + "】中，用于之后手工添加数据");

            try {
                // 如果没有文件CONNECTION_EXCEPTION_FILE，则创建
                file = new File(CONNECTION_EXCEPTION_FILE);
                if (!file.exists()) {
                    file.createNewFile();
                }

                // 写入数据
                fileWriter = new FileWriter(CONNECTION_EXCEPTION_FILE, true);
                fileWriter.write(new Date() + "       " + e.getClass().getName() + "	" + url + "\n");

                // 递归调用
                List<EtfTransactionData> timeOutList = this.readEtfTransactionDataList_sohu(url, codeNum, lastClosePrice);
                for (int i = 0; i < timeOutList.size(); i++) {
                    etfTransactionDataList.add(timeOutList.get(i));
                }
            } catch (IOException e2) {
                e2.printStackTrace();
            } finally {
                try {
                    fileWriter.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return etfTransactionDataList;
    }

    /**
     * 将从网络中读取的数据写入到表ETF_MOVING_AVERAGE中。
     *
     * @param etfTransactionDataList
     */
    public void writeEtfTransactionDataList(List<EtfTransactionData> etfTransactionDataList) {
        etfTransactionDataDao.writeEtfTransactionDataList(etfTransactionDataList);
    }

    /*********************************************************************************************************************
     *
     * 									                             更新每天的数据
     *
     *********************************************************************************************************************/
    /**
     * 根据开始时间和结束时间，更新etf_transaction_data表的last_close_price、change_amount和change_range字段
     */
    @Override
    public void writeEtfTransactionDataLastClosePriceByDate() {
        String beginDate = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES, "etfTransactionData.lastClosePrice.beginDate");
        String endDate = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES, "etfTransactionData.lastClosePrice.endDate");

        etfTransactionDataDao.writeEtfTransactionDataLastClosePriceByDate(beginDate, endDate);
    }

    /**
     * 根据日期date，计算某一日所有ETF的移动平均线数据
     */
    @Override
    public void writeEtfTransactionDataMAByDate() {
        String date = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES, "etfTransactionData.movingAverage.date");

        etfTransactionDataDao.writeEtfTransactionDataMAByDate(date);
    }

    /**
     * 根据日期date，计算某一日所有ETF的Hei Kin Ashi数据
     */
    @Override
    public void writeEtfTransactionDataHeiKinAshiByDate() {
        String date = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES, "etfTransactionData.heiKinAshi.date");

        etfTransactionDataDao.writeEtfTransactionDataHeiKinAshiByDate(date);
    }

    /**
     * 根据日期date，计算所有ETF某一日的乖离率
     */
    @Override
    public void writeEtfTransactionDataBiasByDate() {
        String date = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES, "etfTransactionData.bias.date");

        etfTransactionDataDao.writeEtfTransactionDataBiasByDate(date);
    }

    /**
     * 更新表ETF_TRANSACTION_DATA中某一日的EMA15，EMA26，DIF和DEA字段
     */
    @Override
    public void writeEtfTransactionDataMACDByDate() {
        String date = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES, "etfTransactionData.macd.date");

        etfTransactionDataDao.writeEtfTransactionDataMACDByDate(date);
    }

    /**
     * 更新表ETF_TRANSACTION_DATA中某一日的RSV，K和D字段
     */
    @Override
    public void writeEtfTransactionDataKDByDate() {
        String date = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES, "etfTransactionData.kd.date");

        etfTransactionDataDao.writeEtfTransactionDataKDByDate(date);
    }

    /**
     * 更新表ETF_TRANSACTION_DATA中某一日的correlation***_with_avg_c_p字段
     */
    @Override
    public void writeEtfTransactionDataCorrelationByDate() {
        String beginDate = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES, "etfTransactionData.correlation.beginDate");
        String endDate = PropertiesUtil.getValue(ETF_TRANSACTION_DATA_PROPERTIES, "etfTransactionData.correlation.endDate");

        etfTransactionDataDao.writeEtfTransactionDataCorrelationByDate(beginDate, endDate);
    }

    /*********************************************************************************************************************
     *
     * 									                             更新全部的数据
     *
     *********************************************************************************************************************/
    /**
     * 海量更新etf_transaction_data表的last_close_price、change_amount和change_range字段
     */
    @Override
    public void writeEtfTransactionDataLastClosePrice() {
        etfTransactionDataDao.writeEtfTransactionDataLastClosePrice();
    }

    /**
     * 计算5、10、20、60、120、250日均线
     */
    @Override
    public void writeEtfTransactionDataMA() {
        etfTransactionDataDao.writeEtfTransactionDataMA5();
        etfTransactionDataDao.writeEtfTransactionDataMA10();
        etfTransactionDataDao.writeEtfTransactionDataMA20();
        etfTransactionDataDao.writeEtfTransactionDataMA60();
        etfTransactionDataDao.writeEtfTransactionDataMA120();
        etfTransactionDataDao.writeEtfTransactionDataMA250();
    }

    /**
     * 计算所有ETF的乖离率
     */
    @Override
    public void writeEtfTransactionDataBias() {
        etfTransactionDataDao.writeEtfTransactionDataBias();
    }

    /**
     * 计算Hei Kin Ashi相关的字段
     */
    public void writeEtfTransactionDataHeiKinAshi(){
        etfTransactionDataDao.writeEtfTransactionDataHeiKinAshi();
    }

    /**
     * 计算所有ETF的MACD
     */
    @Override
    public void writeEtfTransactionDataMACD() {
        etfTransactionDataDao.writeEtfTransactionDataMACD();
    }

    /**
     * 计算所有ETF的KD
     */
    @Override
    public void writeEtfTransactionDataKD() {
        etfTransactionDataDao.writeEtfTransactionDataKD();
    }

    /**
     * 计算所有ETF的correlation5_with***字段
     */
    @Override
    public void writeEtfTransactionDataCorrelation() {
        etfTransactionDataDao.writeEtfTransactionDataCorrelation();
    }

    @Override
    public String listToString(List list) {
        return null;
    }
}
